Skip to content

Websocket LLmProviders#159

Open
akavi wants to merge 7 commits intomainfrom
akavi/websocket-providers
Open

Websocket LLmProviders#159
akavi wants to merge 7 commits intomainfrom
akavi/websocket-providers

Conversation

@akavi
Copy link
Collaborator

@akavi akavi commented Feb 25, 2026

What does this PR do?

Websocket APIs are noticeabley faster for certain models. Most notably, gpt-realtime-1.5 and gpt-5.2

Model                                    Connect      Cold T1      Cold T2      Warm T1      Warm T2
                                        avg±std ms   avg±std ms   avg±std ms   avg±std ms   avg±std ms
  ----------------------------------- ------------ ------------ ------------ ------------ ------------
  realtime / gpt-realtime-1.5             1164±122       517±39       493±41            —            —
  ws-mode / gpt-5.2                       1177±170       696±64       675±54       689±44       699±85
  ws-mode / gpt-5-mini                     1083±72     2025±601    4432±1076     1705±285    4244±1350 (2 err)
  ws-mode / gpt-5-nano                    1185±113    3793±1628    6688±2260    3291±1073    6162±1247

Unfortunately:

  1. LiteLlm doesn't support them
  2. OpenAI has two separate WS APIs, one for realtime and one for "websocket mode". IDK why!

Fortunately, it's straightforward It's not super straightforward to add support for both, but I've done it.

We hide the choice of implementation behind the facade of LLmProvider, so it's seamless from the developer PoV

This is a pretty substantial PR, so I've split it into individual commits:

  1. Extract tool merging/resolution from LlmAgent into tools/utils.py: We want to reuse these in the providers
  2. Split provider.py into LlmProvider facade and HttpProvider backend: Prep for introducing new websocket/realtime providers
  3. Add RealtimeProvider and WebSocketProvider backends: The meat of the PR, actually adding the support
  4. Add bench_latency.py for LLM provider latency benchmarking (to verify these providers are actually lower latency)
  5. Support 3.9: Updating the providers to correctly support python 3.9, which requires lazy initialization of asyncio primitives
  6. Centralize configuration detection: I should've probably done this in 2, but I didn't, so doing it here

Type of change

  • Bug fix
  • New feature
  • Breaking change
  • Documentation
  • Other: ___________

Testing

Unit tests + "real" provider tests

Checklist

  • I have read the contributing guidelines
  • I have added tests that prove my fix is effective or that my feature works
  • I have formatted my code with make format

Note

High Risk
Large refactor of core LLM streaming/provider plumbing and introduces persistent WebSocket protocols with new routing/fallback logic; issues here can impact all agent responses, tool calling, and turn lifecycle behavior.

Overview
Adds first-class WebSocket-based LLM backends for OpenAI: a Realtime provider with diff-sync (realtime_provider.py) and a Responses-API WebSocket provider with continuation/divergence handling (websocket_provider.py), plus shared WS stream utilities (stream.py).

Refactors provider.py into a LlmProvider facade that normalizes config/tools, selects HTTP vs WS backends (including HTTP fallback when WS can’t honor certain LlmConfig fields), and centralizes model capability detection (_get_model_config). LlmAgent is updated to use the facade, warm providers on CallStarted, lazily initialize asyncio primitives for Python 3.9, and move tool merging/resolution into tools/utils.py (including native vs fallback web_search behavior).

Updates examples/tests/scripts to the new LlmProvider API and streaming semantics (no async with), adds latency benchmarking and provider test scripts, and expands unit tests to cover backend routing, warmup/tool propagation, model validation, and WS/realtime-specific behaviors.

Written by Cursor Bugbot for commit c571e05. This will update automatically on new commits. Configure here.

@akavi akavi requested review from lucyliulee and sauhardjain and removed request for lucyliulee February 25, 2026 02:02
@akavi akavi changed the title Realtime providers Websocket LLmProviders Feb 25, 2026
@akavi akavi force-pushed the akavi/websocket-providers branch from e98aadd to 8022743 Compare February 25, 2026 20:15
@akavi akavi force-pushed the akavi/websocket-providers branch from 8022743 to 598f448 Compare February 26, 2026 00:33
@akavi akavi force-pushed the akavi/websocket-providers branch from 598f448 to 08e8a93 Compare March 4, 2026 01:24
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 3 potential issues.

Autofix Details

Bugbot Autofix prepared fixes for all 3 issues found in the latest run.

  • ✅ Fixed: Benchmark script uses removed async-with stream pattern
    • Updated stream_turn to iterate directly over provider.chat(...) with async for, matching the stream interface that only implements __aiter__.
  • ✅ Fixed: Message identity only considers first tool call
    • Message identities now include all assistant tool calls and the Realtime diff path expands multi-tool-call assistant messages into per-call items so no tool calls are dropped.
  • ✅ Fixed: Double normalization of tools in LlmAgent+LlmProvider pipeline
    • Added a fast-path tool resolver in LlmProvider that reuses already-normalized FunctionTool inputs and only calls _normalize_tools when needed.

Create PR

Or push these changes by commenting:

@cursor push 471fcca35d
Preview (471fcca35d)
diff --git a/line/llm_agent/provider.py b/line/llm_agent/provider.py
--- a/line/llm_agent/provider.py
+++ b/line/llm_agent/provider.py
@@ -14,7 +14,7 @@
 from typing import Any, List, Optional, Protocol, Tuple, runtime_checkable
 
 from line.llm_agent.config import LlmConfig, _normalize_config
-from line.llm_agent.tools.utils import _normalize_tools
+from line.llm_agent.tools.utils import FunctionTool, _normalize_tools
 
 
 @dataclass
@@ -105,9 +105,8 @@
     ):
         self._model = model
         normalized_config = _normalize_config(config or LlmConfig())
-        normalized_tools, _ = _normalize_tools(tools, model=model) if tools else (None, None)
         self._config = normalized_config
-        self._tools = normalized_tools or []
+        self._tools = _resolve_tools(tools, model=model)
 
         use_realtime = backend == "realtime" or (backend is None and _is_realtime_model(model))
         use_websocket = backend == "websocket" or (backend is None and _is_websocket_model(model))
@@ -140,7 +139,7 @@
 
     def chat(self, messages, tools=None, config=None, **kwargs):
         cfg = _normalize_config(config) if config else self._config
-        effective_tools = _normalize_tools(tools, model=self._model)[0] if tools else self._tools
+        effective_tools = _resolve_tools(tools, model=self._model) if tools else self._tools
         return self._backend.chat(messages, effective_tools, config=cfg, **kwargs)
 
     async def warmup(self, config=None):
@@ -199,18 +198,29 @@
     return lower.startswith("gpt-5.2") or lower.startswith("gpt5.2")
 
 
+def _resolve_tools(tools: Optional[List[Any]], model: str) -> List[FunctionTool]:
+    """Resolve tools to FunctionTools, avoiding no-op re-normalization."""
+    if not tools:
+        return []
+    if all(isinstance(tool, FunctionTool) for tool in tools):
+        return list(tools)
+    return _normalize_tools(tools, model=model)[0]
+
+
 def _message_identity(msg: Message) -> tuple:
     """Compute an identity fingerprint for a single Message.
 
     Used by both WebSocket providers for divergence detection / diff-sync.
 
-    For assistant messages with tool calls, identity is derived from the
-    *first* tool call (mirrors how the server tracks multi-tool-call turns
-    as a single logical unit).
+    For assistant messages with tool calls, identity includes all tool calls
+    so divergence checks detect changes to any call in the turn.
     """
     if msg.tool_calls:
-        tc = msg.tool_calls[0]
-        return ("assistant_tool_call", tc.name, tc.arguments, tc.id)
+        if len(msg.tool_calls) == 1:
+            tc = msg.tool_calls[0]
+            return ("assistant_tool_call", tc.name, tc.arguments, tc.id)
+        tool_calls_key = tuple((tc.name, tc.arguments, tc.id) for tc in msg.tool_calls)
+        return ("assistant_tool_calls", tool_calls_key)
     return (msg.role, msg.content or "", msg.tool_call_id or "", msg.name or "")
 
 

diff --git a/line/llm_agent/realtime_provider.py b/line/llm_agent/realtime_provider.py
--- a/line/llm_agent/realtime_provider.py
+++ b/line/llm_agent/realtime_provider.py
@@ -399,11 +399,8 @@
 def _message_to_item(msg: Message) -> Dict[str, Any]:
     """Convert a Message to a Realtime API conversation item dict.
 
-    Note: for assistant messages with multiple tool calls, only the first
-    tool call is converted.  The Realtime API represents each tool call as a
-    separate conversation item, but the diff algorithm tracks identity at the
-    message level.  Handling multi-tool-call expansion here would require
-    reworking the diff model.
+    Assistant tool-call messages must contain exactly one tool call; callers
+    are responsible for expanding multi-tool-call turns into separate messages.
     """
     if msg.role == "user":
         return {
@@ -414,13 +411,8 @@
 
     if msg.role == "assistant":
         if msg.tool_calls:
-            if len(msg.tool_calls) > 1:
-                logger.warning(
-                    "Realtime API: assistant message has %d tool calls but only "
-                    "the first is converted (dropping %s)",
-                    len(msg.tool_calls),
-                    [tc.name for tc in msg.tool_calls[1:]],
-                )
+            if len(msg.tool_calls) != 1:
+                raise ValueError("Assistant tool-call message must contain exactly one tool call")
             tc = msg.tool_calls[0]
             return {
                 "type": "function_call",
@@ -464,7 +456,19 @@
         if msg.role == "system":
             system_parts.append(msg.content or "")
         else:
-            non_system.append(msg)
+            if msg.role == "assistant" and msg.tool_calls and len(msg.tool_calls) > 1:
+                for tc in msg.tool_calls:
+                    non_system.append(
+                        Message(
+                            role="assistant",
+                            content=msg.content,
+                            tool_calls=[tc],
+                            tool_call_id=msg.tool_call_id,
+                            name=msg.name,
+                        )
+                    )
+            else:
+                non_system.append(msg)
 
     desired_instructions = "\n\n".join(system_parts) if system_parts else None
 

diff --git a/line/llm_agent/scripts/bench_latency.py b/line/llm_agent/scripts/bench_latency.py
--- a/line/llm_agent/scripts/bench_latency.py
+++ b/line/llm_agent/scripts/bench_latency.py
@@ -164,12 +164,11 @@
     ttft = None
     text_parts: list[str] = []
 
-    async with provider.chat(messages, config=config) as stream:
-        async for chunk in stream:
-            if chunk.text:
-                if ttft is None:
-                    ttft = (time.perf_counter() - t0) * 1000
-                text_parts.append(chunk.text)
+    async for chunk in provider.chat(messages, config=config):
+        if chunk.text:
+            if ttft is None:
+                ttft = (time.perf_counter() - t0) * 1000
+            text_parts.append(chunk.text)
 
     total = (time.perf_counter() - t0) * 1000
     return TurnResult(

diff --git a/line/llm_agent/websocket_provider.py b/line/llm_agent/websocket_provider.py
--- a/line/llm_agent/websocket_provider.py
+++ b/line/llm_agent/websocket_provider.py
@@ -447,19 +447,26 @@
 def _extract_model_output_identity(response: Dict[str, Any]) -> Optional[tuple]:
     """Derive a single message-level identity from a Responses API output.
 
-    Mirrors ``_message_identity``: if the model produced tool calls we key
-    on the first one; otherwise we key on the full text.
+    Mirrors ``_message_identity``: single-tool-call outputs use a compact key,
+    while multi-tool-call outputs include every call in order.
     """
     output_items = response.get("output", [])
     function_calls = [i for i in output_items if i.get("type") == "function_call"]
 
     if function_calls:
-        fc = function_calls[0]
+        if len(function_calls) == 1:
+            fc = function_calls[0]
+            return (
+                "assistant_tool_call",
+                fc.get("name", ""),
+                fc.get("arguments", ""),
+                fc.get("call_id", ""),
+            )
         return (
-            "assistant_tool_call",
-            fc.get("name", ""),
-            fc.get("arguments", ""),
-            fc.get("call_id", ""),
+            "assistant_tool_calls",
+            tuple(
+                (fc.get("name", ""), fc.get("arguments", ""), fc.get("call_id", "")) for fc in function_calls
+            ),
         )
 
     # Concatenate text across all message output items.
This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

@akavi akavi force-pushed the akavi/websocket-providers branch from 08e8a93 to e97ad49 Compare March 4, 2026 18:30
@akavi akavi force-pushed the akavi/websocket-providers branch from e97ad49 to dc4c1a9 Compare March 5, 2026 22:20
@akavi akavi force-pushed the akavi/websocket-providers branch from dc4c1a9 to 67069c3 Compare March 6, 2026 17:30
@akavi akavi force-pushed the akavi/websocket-providers branch 2 times, most recently from 149738e to 7a2463b Compare March 7, 2026 00:50
@akavi akavi force-pushed the akavi/websocket-providers branch 3 times, most recently from 3a53537 to a8f738f Compare March 9, 2026 21:26
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Autofix Details

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: x
    • x

Create PR

Or push these changes by commenting:

@cursor push d109cf302a
Preview (d109cf302a)
diff --git a/line/llm_agent/llm_agent.py b/line/llm_agent/llm_agent.py
--- a/line/llm_agent/llm_agent.py
+++ b/line/llm_agent/llm_agent.py
@@ -311,7 +311,7 @@
 
             stream = self._llm.chat(
                 messages,
-                tools or None,
+                tools,
                 config=config,
                 **chat_kwargs,
             )
This Bugbot Autofix run was free. To enable autofix for future PRs, go to the Cursor dashboard.

@akavi akavi requested review from lucyliulee and sauhardjain March 10, 2026 00:23
@sauhardjain
Copy link
Collaborator

I will take another commit-by-commit pass because this is pretty large surface area

@akavi akavi force-pushed the akavi/websocket-providers branch from 4e0603c to 3a00fba Compare March 11, 2026 21:10
akavi and others added 7 commits March 11, 2026 14:53
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
3.9 requires async primitives (lock/queues) to be initialized inside the
context of a loop. The loop isn't available at initialization time.

Fix: lazy initialize
@akavi akavi force-pushed the akavi/websocket-providers branch from 3a00fba to c571e05 Compare March 11, 2026 21:57
Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

# Lock ownership transfers to the stream — released in __aexit__
def on_response_done(response: Dict[str, Any]) -> None:
for item in response.get("output", []):
_track_output_item(self._history, item)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realtime provider drain corrupts history on cancellation

High Severity

The on_response_done callback in _RealtimeProvider._setup_chat unconditionally appends output items from the response to _history, regardless of response status. When a stream is cancelled (e.g., barge-in), _cancel_and_drain calls stream.drain() which invokes _on_response_done with the cancelled response data. Partial output items from the cancelled response get tracked in _history, corrupting diff-sync state for subsequent chat() calls. The _WebSocketProvider correctly guards against this by checking response.get("status") == "completed" before calling _finalize_response.

Additional Locations (1)
Fix in Cursor Fix in Web


async def _connect(self) -> None:
"""Open a new WS connection and wait for session.created."""
url = f"{WS_URL}?model={self._model}"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realtime provider doesn't strip openai/ model prefix

Medium Severity

_RealtimeProvider._connect uses self._model directly in the WebSocket URL (f"{WS_URL}?model={self._model}") without stripping the openai/ prefix. The _is_realtime_model detection function accepts LiteLLM-style names like "openai/gpt-4o-realtime-preview", but the raw name gets embedded in the URL, causing an API error. The _WebSocketProvider has _normalize_openai_model_name for exactly this purpose but the Realtime provider doesn't use it.

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants